{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Apache Beam Portability Framework WordCount example\n",
    "\n",
    "This notebook demonstrates how to run the Beam [wordcount.py](https://github.com/apache/beam/blob/v2.14.0/sdks/python/apache_beam/examples/wordcount.py) in Hopsworks.\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Do the imports and start the Beam jobservice"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "import apache_beam as beam\n",
    "from apache_beam.io import ReadFromText\n",
    "from apache_beam.io import WriteToText\n",
    "from apache_beam.metrics import Metrics\n",
    "from apache_beam.metrics.metric import MetricsFilter\n",
    "from hops import beam as hops_beam\n",
    "from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions, SetupOptions, HadoopFileSystemOptions, PortableOptions, WorkerOptions, DebugOptions\n",
    "\n",
    "from hops import hdfs as hopsfs\n",
    "\n",
    "# Start Beam jobservice\n",
    "hops_beam.start(taskmanager_heap_size=8192)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Get the portable runner pipeline options"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "pipeline_args = hops_beam.get_portable_runner_config()\n",
    "pipeline_options=PipelineOptions(flags=pipeline_args)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Run the WordCount example. \n",
    "Default **input** is downloaded from `gs://dataflow-samples/shakespeare/kinglear.txt`. Alternatively, you can save the input file in a dataset, for example `Resources` and set the input to `os.path.join(hopsfs.project_path(exclude_nn_addr=True), 'Resources')`\n",
    "\n",
    "By default, **output** is written in the ``Resources`` dataset."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#\n",
    "# Licensed to the Apache Software Foundation (ASF) under one or more\n",
    "# contributor license agreements.  See the NOTICE file distributed with\n",
    "# this work for additional information regarding copyright ownership.\n",
    "# The ASF licenses this file to You under the Apache License, Version 2.0\n",
    "# (the \"License\"); you may not use this file except in compliance with\n",
    "# the License.  You may obtain a copy of the License at\n",
    "#\n",
    "#    http://www.apache.org/licenses/LICENSE-2.0\n",
    "#\n",
    "# Unless required by applicable law or agreed to in writing, software\n",
    "# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
    "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
    "# See the License for the specific language governing permissions and\n",
    "# limitations under the License.\n",
    "#\n",
    "\n",
    "class WordExtractingDoFn(beam.DoFn):\n",
    "  \"\"\"Parse each line of input text into words.\"\"\"\n",
    "\n",
    "  def __init__(self):\n",
    "    self.words_counter = Metrics.counter(self.__class__, 'words')\n",
    "    self.word_lengths_counter = Metrics.counter(self.__class__, 'word_lengths')\n",
    "    self.word_lengths_dist = Metrics.distribution(\n",
    "        self.__class__, 'word_len_dist')\n",
    "    self.empty_line_counter = Metrics.counter(self.__class__, 'empty_lines')\n",
    "\n",
    "  def process(self, element):\n",
    "    \"\"\"Returns an iterator over the words of this element.\n",
    "    The element is a line of text.  If the line is blank, note that, too.\n",
    "    Args:\n",
    "      element: the element being processed\n",
    "    Returns:\n",
    "      The processed element.\n",
    "    \"\"\"\n",
    "    text_line = element.strip()\n",
    "    if not text_line:\n",
    "      self.empty_line_counter.inc(1)\n",
    "    words = re.findall(r'[\\w\\']+', text_line, re.UNICODE)\n",
    "    for w in words:\n",
    "      self.words_counter.inc()\n",
    "      self.word_lengths_counter.inc(len(w))\n",
    "      self.word_lengths_dist.update(len(w))\n",
    "    return words\n",
    "\n",
    "p = beam.Pipeline(options=pipeline_options)\n",
    "\n",
    "# Read the text file[pattern] into a PCollection.\n",
    "lines = p | 'read' >> ReadFromText(\"gs://dataflow-samples/shakespeare/kinglear.txt\")\n",
    "\n",
    "# Count the occurrences of each word.\n",
    "def count_ones(word_ones):\n",
    "    (word, ones) = word_ones\n",
    "    return (word, sum(ones))\n",
    "\n",
    "counts = (lines\n",
    "        | 'split' >> (beam.ParDo(WordExtractingDoFn())\n",
    "                      .with_output_types(unicode))\n",
    "        | 'pair_with_one' >> beam.Map(lambda x: (x, 1))\n",
    "        | 'group' >> beam.GroupByKey()\n",
    "        | 'count' >> beam.Map(count_ones))\n",
    "\n",
    "# Format the counts into a PCollection of strings.\n",
    "def format_result(word_count):\n",
    "    (word, count) = word_count\n",
    "    return '%s: %d' % (word, count)\n",
    "\n",
    "output = counts | 'format' >> beam.Map(format_result)\n",
    "\n",
    "# Write the output using a \"Write\" transform that has side effects.\n",
    "# pylint: disable=expression-not-assigned\n",
    "output | 'write' >> WriteToText(os.path.join(hopsfs.project_path(exclude_nn_addr=True), 'Resources','wordcount.out'))\n",
    "\n",
    "result = p.run()\n",
    "result.wait_until_finish()\n",
    "\n",
    "# Do not query metrics when creating a template which doesn't run\n",
    "if (not hasattr(result, 'has_job')    # direct runner\n",
    "  or result.has_job):               # not just a template creation\n",
    "    empty_lines_filter = MetricsFilter().with_name('empty_lines')\n",
    "    query_result = result.metrics().query(empty_lines_filter)\n",
    "if query_result['counters']:\n",
    "    empty_lines_counter = query_result['counters'][0]\n",
    "    logging.info('number of empty lines: %d', empty_lines_counter.result)\n",
    "\n",
    "word_lengths_filter = MetricsFilter().with_name('word_len_dist')\n",
    "query_result = result.metrics().query(word_lengths_filter)\n",
    "if query_result['distributions']:\n",
    "    word_lengths_dist = query_result['distributions'][0]\n",
    "    logging.info('average word length: %d', word_lengths_dist.result.mean)\n",
    "print('Done')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "python-flink_tutorial__meb10000",
   "language": "python",
   "name": "python2"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.16"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}